[Pipeline RL] Add support for PipelineRL#428
[Pipeline RL] Add support for PipelineRL#428jlamypoirier wants to merge 112 commits intojlp_entropy_loss_tweaksfrom
Conversation
…M into denis/new_datasets
…enis/new_datasets
| backend="nccl", | ||
| init_method=init_method, | ||
| world_size=config.broadcast.external_world_size + 1, | ||
| rank=0, |
There was a problem hiding this comment.
Maybe this should be configurable, since the external system may not treat us as rank 0.
There was a problem hiding this comment.
We also have control over the external system, we can make it so. It's easier to sync program with hard-coded values than syncing config files.
There was a problem hiding this comment.
Maybe name it tests/models/test_streaming_training_callbacks.py or something similar?
Or perhaps it would be better under tests/engine/trainer/ or tests/trainer/, since this is testing trainer callbacks with a streaming dataset rather than model logic (with the exception of the tensor iterator).
There was a problem hiding this comment.
This needs to be in test/models because it uses the model_configs machinery. I tend to prefer concise names, so test_streaming seems appropriate.
| worker_resources: WorkerResources, | ||
| report_subtest, | ||
| ): | ||
| report_subtest(path := run_test_script_base_path / config.name, config.total_gpus) |
There was a problem hiding this comment.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
tests/data/test_streaming.py
Outdated
| @pytest.mark.depends_on(on=["test_data_streaming"]) | ||
| @pytest.mark.parametrize(("name", "num_gpus", "distributed_config_dict"), _DISTRIBUTED_TESTING_CONFIGS) | ||
| def test_data_streaming_distributed(result_path, name, num_gpus, distributed_config_dict, report_subtest): | ||
| report_subtest(path := result_path / f"data_streaming/{name}", num_gpus) |
There was a problem hiding this comment.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
fast_llm/data/dataset/config.py
Outdated
|
|
||
| _abstract = False | ||
|
|
||
| acknowledge_interval: int = Field( |
There was a problem hiding this comment.
This is also not needed if we only use consumer gorups, see above comment for implementation.
|
what does this do? is it feature complete? what is the next step? |
Co-authored-by: oleksost <ostapy2@gmail.com>
This PR provides the initial integration with PipelineRL with GRPO loss.
It introduces:
training_started,step_finished, andtraining_finished.This enables seamless coordination between Fast-LLM training and PipelineRL-based inference or orchestration components.